Netty 的 Channel、Promise、Pipeline 详解

您所在的位置:网站首页 netty channelhandlercontext Netty 的 Channel、Promise、Pipeline 详解

Netty 的 Channel、Promise、Pipeline 详解

2024-07-14 21:13| 来源: 网络整理| 查看: 265

Netty Demo 示例

首先通过一个示例来分析,创建一个 NioServerSocketChannel 监听本机端口 11111 的 Socket 连接,将收到的消息原样返回;然后再创建一个 NioSocketChannel,发起对本机的 11111 端口的 Socket 连接,发送字符串 ”Netty rocks!“。预期能收到服务端返回的 “Netty rocks!” 响应。

Maven 依赖

本文使用的 Netty 版本是 5.0.0.Alpha2,与 4.x 版本相比变化还是挺大的。pom 文件添加:

代码语言:javascript复制 io.netty netty-all 5.0.0.Alpha2 创建一个 Server

创建一个 NioServerSocketChannel,监听本机端口 11111 的 Socket 连接。

代码语言:javascript复制public class EchoServer { private final int port; public EchoServer(int port) { this.port = port; } public static void main(String[] args) throws InterruptedException { new EchoServer(11111).start(); } public void start() throws InterruptedException { final EchoServerHandler serverHandler = new EchoServerHandler(); NioEventLoopGroup group = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(group).channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(serverHandler); } }); ChannelFuture channelFuture = b.bind().sync(); channelFuture.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } } }

EchoServerHandler 的实现如下,在 channelRead 时将数据写入 ChannelHandlerContext,并将数据输出到控制台。

代码语言:javascript复制@ChannelHandler.Sharable public class EchoServerHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf) msg; System.out.println("Server received : " + in.toString(CharsetUtil.UTF_8)); ctx.write(in); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.writeAndFlush(Unpooled.EMPTY_BUFFER) .addListener(ChannelFutureListener.CLOSE); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }创建一个 Client

创建一个 NioSocketChannel,发起对本机的 11111 端口的 Socket 连接。

代码语言:javascript复制public class EchoClient { private final String host; private final int port; public EchoClient(String host, int port) { this.host = host; this.port = port; } public static void main(String[] args) throws InterruptedException { new EchoClient("localhost", 11111).start(); } public void start() throws InterruptedException { NioEventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class) .remoteAddress(new InetSocketAddress(host, port)) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new EchoClientHandler()); } }); ChannelFuture channelFuture = bootstrap.connect().sync(); channelFuture.channel().closeFuture().sync(); }finally { group.shutdownGracefully().sync(); } } }

EchoClientHandler 的实现如下,messageReceived(在 Netty 4.x 为 channelRead0)对于泛型 I(本例中是 ByteBuf)进行处理,将数据输出到控制台。

代码语言:javascript复制@ChannelHandler.Sharable public class EchoClientHandler extends SimpleChannelInboundHandler { @Override public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } @Override public void messageReceived(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println( "Client received: " + msg.toString(CharsetUtil.UTF_8)); } }io.netty.channel.Channel 类

上面 demo 中直接使用到的 2 个类是:NioServerSocketChannel 和 NioSocketChannel,这两个类底层都是实现了 Channel 接口,注意这个 Channel 接口是 io.netty.channel.Channel,而不是 JDK 自带的 java.nio.channels.Channel!两个类的继承关系如下:

NioServerSocketChannel

NioSocketChannel

Channel 提供应用程序网络套接字或其他组件连接,提供读、写、连接和绑定等 I/O 操作。

Channel 的当前状态(开启、关闭)Channel 的配置参数(接收缓冲区大小)I/O 操作(读、写、连接、绑定)ChannelPipeline,处理所有与 Channel 绑定的 I/O 事件和请求所有 I/O 操作都是异步的

Netty 中所有 I/O 操作都是异步的。这意味着所有的 I/O 调用都会立即返回,不能保证在调用结束时请求的 I/O 操作是否完成。调用者会得到一个 ChannelFuture 实例,该实例会在请求的 I/O 操作成功、失败、取消时通知调用者。

Channel 是分层级的

Channel 可以有 parent,这取决于 Channel 的创建方式。例如被 ServerSocketChannel 接收的 SocketChannel,会得到一个 ServerSocketChannel 作为它的 parent。

释放资源

使用完毕后,调用 close() 或 close(ChannelPromise) 释放资源非常重要。

ChannelFuture

ChannelFuture 是一个异步 Channel I/O 操作的结果。如上面所说,Netty 中所有 I/O 操作都是异步的。这意味着所有的 I/O 调用都会立即返回,不能保证在调用结束时请求的 I/O 操作是否完成。调用者会得到一个 ChannelFuture 实例。

ChannelFuture 只有 2 种状态:未完成、已完成。I/O 操作开始时,将会创建一个新的 ChannelFuture 对象,初始时是未完成状态 —— 不是成功、失败或取消的任何一种状态,因为 I/O 操作还没有完成。如果 I/O 操作结束(无论成功、失败、取消),ChannelFuture 都会处于完成状态。注意即使是失败也属于完成状态。

代码语言:javascript复制 +---------------------------+ | Completed successfully | +---------------------------+ +----> isDone() = true | +--------------------------+ | | isSuccess() = true | | Uncompleted | | +===========================+ +--------------------------+ | | Completed with failure | | isDone() = false | | +---------------------------+ | isSuccess() = false |----+----> isDone() = true | | isCancelled() = false | | | cause() = non-null | | cause() = null | | +===========================+ +--------------------------+ | | Completed by cancellation | | +---------------------------+ +----> isDone() = true | | isCancelled() = true | +---------------------------+

我们还可以添加 ChannelFutureListener,以便在 I/O 操作完成时收到通知。

使用 addListener(GenericFutureListener) 而不是 await()

addListener(GenericFutureListener) 是非阻塞的,只需要将特定的 ChannelFutureListener 添加到ChannelFuture 即可,I/O 线程会在 ChannelFuture 绑定的 I/O 操作完成时通知监听器。ChannelFutureListener 完全非阻塞,因此效率极高。

而 await() 是阻塞操作,一旦调用,调用者线程就会阻塞直到操作完成。使用 await() 操作更容易,但是成本更高。此外,在特定的情况下还可能出现死锁。

使用 ChannelHandler 而不是 await()

ChannelHandler 中的事件处理方法通常由 I/O 线程调用,如果 await() 是由事件处理方法(I/O 线程)调用的,那么它正在等待的 I/O 操作可能永远也不会完成,因为 await() 方法可以阻止它正在等待的 I/O 操作,也就是发生了死锁。

代码语言:javascript复制 // BAD - NEVER DO THIS @Override public void channelRead(ChannelHandlerContext ctx, GoodByeMessage msg) { ChannelFuture future = ctx.channel().close(); future.awaitUninterruptibly(); // Perform post-closure operation // ... } // GOOD @Override public void channelRead(ChannelHandlerContext ctx, GoodByeMessage msg) { ChannelFuture future = ctx.channel().close(); future.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) { // Perform post-closure operation // ... } }); }创建 Channel

在 Bootstrap(客户端) 和 ServerBootstrap(服务端) 的启动过程中都会调用 AbstractBootstrap#channel(…) 方法(参考文章开头的 Demo):

代码语言:javascript复制public B channel(Class


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3